package com.atguigu.source;

import com.atguigu.beans.WaterSensor;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class RandomWaterSenson implements SourceFunction<WaterSensor> {
    boolean flag = true;
    @Override
    //产生数据
    public void run(SourceContext<WaterSensor> sourceContext) throws Exception {
        Random random = new Random();
        while (flag){
            sourceContext.collect(new WaterSensor(
                    "sensor" +random.nextInt(),
                    Calendar.getInstance().getTimeInMillis(),
                    random.nextInt(99)

            ));
            Thread.sleep(1000);
        }

    }

    @Override
    //停止run
    public void cancel() {
        flag = false;
    }
}
